package com.huan.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.nio.charset.StandardCharsets;

/**
 * DataSet Api 实现一个简单的 word count 案例， 官方推荐使用 DataStream Api来实现，使用DataStream Api
 * 在提交任务的时候使用 bin/flink run -Dexecution.runtime-mode=BATCH DataSetApiWordCount.jar
 * 批处理： 一起处理，不是读取一条就处理一条，输出一条，而是一起处理。
 * @author huan.fu
 * @date 2023/9/13 - 20:15
 */
public class DataSetApiWordCount {

    public static void main(String[] args) throws Exception {
        //  1、创建执行环境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        // 2、从文件中读取数据
        String filePath = "/Users/huan/code/IdeaProjects/me/spring-cloud-parent/flink/flink-wordcount-dataset-api/src/main/resources/word.txt";
        environment.readTextFile(filePath, StandardCharsets.UTF_8.name())
                // 3、将读取到一行数据进行切割、转换
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        // 每一行以空格进行分隔
                        String[] words = line.split(" ");
                        for (String word : words) {
                            // 使用 collector 向下游发送数据
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                // 4、根据 word 进行分组 0表示的是 Tuple2对象中的第一个字段的值
                .groupBy(0)
                // 5、各分组内进行聚合 1表示的是 Tuple2对象中的第二个字段的值
                .sum(1)
                // 6、输出
                .print();
    }
}
